Skip to content

Cursor API fix#92

Merged
moltude merged 6 commits intomainfrom
fix/api-exhaustion
Dec 8, 2025
Merged

Cursor API fix#92
moltude merged 6 commits intomainfrom
fix/api-exhaustion

Conversation

@moltude
Copy link
Copy Markdown
Contributor

@moltude moltude commented Dec 8, 2025

Opus 4.5 work to try and fix part of the connection exhaustion through changes to AKKA and query concurrency.

PR: Increase Akka HTTP pool capacity and add ES request concurrency limiting

Problem

Production outages caused by BufferOverflowException when heavy search traffic overwhelms the Akka HTTP client pool. The default pool settings (max-connections=4, max-open-requests=32) are too small for burst traffic, causing requests to queue up and overflow even though Elasticsearch is healthy and responding (albeit with 300-700ms latency on complex queries).

Solution

1. Increase Akka HTTP connection pool capacity (application.conf)

  • max-connections: 4 → 16
  • max-open-requests: 32 → 128

2. Add semaphore-based concurrency limiting (ElasticSearchClient.scala)

  • Caps concurrent in-flight ES requests to 32 per API instance
  • Prevents unbounded request buildup that leads to pool overflow
  • Minimal code change (~15 lines) wrapping existing Http().singleRequest() calls

Configuration

All values are tunable via environment variables without code changes:

Variable Default Description
AKKA_HTTP_MAX_CONNECTIONS 16 HTTP pool connections to ES
AKKA_HTTP_MAX_OPEN_REQUESTS 128 Max queued requests in pool
ES_MAX_CONCURRENT_REQUESTS 32 Semaphore permits for ES calls

Rollback

  1. Config: Remove akka.http.host-connection-pool block from application.conf
  2. Code: Remove semaphore + withConcurrencyLimit wrapper, revert to direct Http().singleRequest() calls

Testing

  • No changes to request/response behavior
  • Existing tests should pass unchanged
  • Monitor in production for reduced 5xx rate and absence of BufferOverflowException

Related

This is Phase 1 of ES performance remediation. Phase 2 (query optimizations) will follow once production is stabilized.

Summary by CodeRabbit

  • Chores
    • Added tunable backend connection settings with environment-variable overrides for HTTP connections.
    • Added configurable concurrency limits and timeouts for search-related requests to control throughput.
    • Expected outcome: more stable and predictable search behavior under load with operator-configurable performance controls.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Dec 8, 2025

Caution

Review failed

The pull request is closed.

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds Akka-HTTP host-connection-pool settings and a semaphore-based concurrency limiter around ElasticSearch HTTP requests, configurable via environment variables and applied to all internal ES request paths without changing public APIs.

Changes

Cohort / File(s) Summary
Configuration
src/main/resources/application.conf
Adds akka.http.host-connection-pool block with max-connections and max-open-requests plus environment-variable overrides and explanatory comments.
ElasticSearch Concurrency Control
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala
Introduces environment-driven semaphore settings (ES_MAX_CONCURRENT_REQUESTS, ES_SEMAPHORE_TIMEOUT_SECONDS), logger-backed validation, a concurrency limiter (withConcurrencyLimit) that wraps Http().singleRequest calls, and applies the limiter across all ES request paths (processSearch, processFetch, processMultiFetch, processRandom). Minor import/formatting adjustments included.

Sequence Diagram(s)

mermaid
sequenceDiagram
participant Caller
participant ElasticSearchClient
participant Semaphore
participant AkkaHttp as Akka-Http

Caller->>ElasticSearchClient: invoke search/fetch/multiFetch/random
ElasticSearchClient->>Semaphore: tryAcquire (with timeout)
alt permit acquired
    ElasticSearchClient->>AkkaHttp: Http().singleRequest(...)
    AkkaHttp-->>ElasticSearchClient: Future[HttpResponse]
    ElasticSearchClient->>Semaphore: release
    ElasticSearchClient-->>Caller: return Future result
else timeout/failure
    Semaphore-->>ElasticSearchClient: acquisition failed/timeout
    ElasticSearchClient-->>Caller: fail Future (timeout/error)

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Inspect semaphore initialization, default values, and env-var parsing/validation.
  • Verify withConcurrencyLimit correctly handles timeouts, exceptions, cancellations, and always releases permits.
  • Confirm all ES request call sites are consistently wrapped and none were missed.
  • Validate ExecutionContext usage to avoid blocking or thread-pool starvation and check logging/metrics paths.

Pre-merge checks and finishing touches

❌ Failed checks (1 inconclusive)
Check name Status Explanation Resolution
Title check ❓ Inconclusive The PR title 'Cursor API fix' is vague and does not clearly convey the main changes addressing connection exhaustion through pool capacity increases and semaphore-based concurrency limiting. Consider a more descriptive title such as 'Add concurrency limiter and increase connection pool limits to prevent exhaustion' or 'Mitigate connection exhaustion with pool tuning and ES request limiting'.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 939ef26 and 42aec04.

📒 Files selected for processing (1)
  • src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (9 hunks)

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses production connection exhaustion issues by increasing Akka HTTP connection pool capacity and implementing semaphore-based concurrency limiting for Elasticsearch requests. The changes aim to prevent BufferOverflowException errors during traffic bursts by expanding pool capacity from 4 to 16 connections and max open requests from 32 to 128, while also capping concurrent ES requests at 32 per instance.

Key Changes:

  • Increased Akka HTTP pool settings (max-connections: 4→16, max-open-requests: 32→128) with environment variable overrides
  • Added Java Semaphore-based concurrency limiter wrapping all ES HTTP requests
  • Made concurrency limit configurable via ES_MAX_CONCURRENT_REQUESTS environment variable (default: 32)

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
src/main/resources/application.conf Configures increased Akka HTTP connection pool capacity with environment variable overrides
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala Implements semaphore-based concurrency limiting for all ES request types (search, fetch, multi-fetch, random)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala Outdated
Comment thread src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala Outdated
Comment thread src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/main/resources/application.conf (1)

14-22: Akka HTTP pool config and env overrides look consistent

The new akka.http.host-connection-pool block follows the same “default then optional env override” pattern used elsewhere in this file, and the 16/128 defaults match the PR description. No issues from a config/syntax standpoint; just ensure ops tunes AKKA_HTTP_MAX_CONNECTIONS / AKKA_HTTP_MAX_OPEN_REQUESTS based on ES and JVM capacity in each environment.

src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (2)

20-35: Semaphore limiter works but consider guarding synchronous failures and avoiding blocking on the dispatcher

The semaphore-based limiter is a simple way to cap in‑flight ES requests per JVM, but there are two caveats:

  1. Blocking acquire on the default dispatcher
    semaphore.acquire() blocks the calling thread. Under sustained overload, many session actors can end up blocked here, tying up dispatcher threads and reducing overall throughput. It’s not a correctness bug, but it does work against Akka’s non‑blocking design and can become a bottleneck at high QPS.

  2. No protection if f throws synchronously
    If the by‑name f throws before returning a Future (even if unlikely for Http().singleRequest), the permit is never released.

You can make the helper more robust with minimal change by guarding f and still keep the current design:

 private def withConcurrencyLimit[T](f: => Future[T])
                                    (implicit ec: ExecutionContext): Future[T] = {
-    semaphore.acquire()
-    f.andThen { case _ => semaphore.release() }(ec)
+    semaphore.acquire()
+    val future =
+      try f
+      catch {
+        case scala.util.control.NonFatal(e) =>
+          semaphore.release()
+          throw e
+      }
+    future.andThen { case _ => semaphore.release() }(ec)
 }

Longer‑term, if this limiter starts engaging frequently, consider a non‑blocking approach (e.g., acquiring permits on a dedicated blocking dispatcher, or using an async/stream‑based limiter) to avoid occupying default dispatcher threads while waiting for capacity.


12-13: Consistent concurrency wiring around ES calls looks good

The added imports, per‑session implicit val ec = system.executionContext, and use of withConcurrencyLimit { Http().singleRequest(...) } in all four paths (search, fetch, multi‑fetch, random) are cohesive and keep ES concurrency control centralized in one helper. This keeps behavior unchanged from the caller’s perspective while giving you a single place to tune concurrency.

If you find the implicit ec boilerplate repetitive across the process* methods, you could later factor a small shared helper that takes ActorContext and an HttpRequest and returns the Future[HttpResponse] under the limiter, but that’s optional and not required for this fix.

Also applies to: 101-103, 111-113, 157-158, 161-163, 209-210, 218-220, 263-264, 272-274

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ea8a9ca and 02ac489.

📒 Files selected for processing (2)
  • src/main/resources/application.conf (1 hunks)
  • src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Agent

Parameter validation

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 02ac489 and 93ec885.

📒 Files selected for processing (1)
  • src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (8 hunks)
🔇 Additional comments (5)
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (5)

12-13: LGTM!

The imports are appropriate for the semaphore-based concurrency limiting feature.


20-35: Past review concerns addressed.

The environment variable parsing now includes proper error handling for invalid numeric values and validation for positive integers. The implementation correctly addresses the concerns raised in previous reviews.


44-66: Past review concerns fully addressed.

The withConcurrencyLimit implementation now correctly handles all previously flagged issues:

  • Uses tryAcquire with timeout instead of blocking acquire(), preventing actor thread blocking
  • Includes try-catch to ensure permit release even if Future construction fails
  • Provides clear error messaging when concurrency limit is exceeded

The implementation is robust and addresses concurrency concerns appropriately.


133-133: LGTM!

The ExecutionContext is correctly added as an implicit parameter (required by withConcurrencyLimit), and the ES HTTP request is properly wrapped with the concurrency limiter.

Also applies to: 142-144


188-188: LGTM!

The concurrency limiter is consistently applied across all ES request paths (fetch, multi-fetch, and random), with the necessary ExecutionContext implicit parameter added to each method. The implementation maintains consistency with the search request pattern.

Also applies to: 192-194, 240-240, 249-251, 294-294, 303-305

Comment thread src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala
Comment thread src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 8 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala
Comment thread src/main/resources/application.conf
Comment thread src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala
Comment thread src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala
Comment thread src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala
Comment thread src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala
Comment thread src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala
Comment thread src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (1)

74-74: Fix error message type inconsistency.

The error message states "must be positive integer" but semaphoreTimeoutSeconds is declared as Long. This inconsistency was flagged in previous reviews but remains uncorrected.

Apply this diff:

-              s"Invalid value for $envVar: '$value' (must be positive integer). " +
+              s"Invalid value for $envVar: '$value' (must be positive long). " +

Same issue exists at line 80:

-              s"Invalid value for $envVar: '$value' (not a valid integer). " +
+              s"Invalid value for $envVar: '$value' (not a valid long). " +
🧹 Nitpick comments (1)
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (1)

90-92: Add test coverage for concurrency limiting functionality.

The new concurrency limiting logic lacks test coverage. Given that this repository has comprehensive automated testing (as noted in past reviews), consider adding tests to verify:

  1. Permit acquisition and release under normal operation
  2. Timeout behavior when concurrency limit is exceeded
  3. Proper permit release on both successful and failed requests
  4. Configuration parsing with invalid/missing environment variables
  5. Behavior under concurrent load (stress testing)

While manual/production testing may suffice for Phase 1 remediation, automated tests would:

  • Prevent regressions during Phase 2 query optimizations
  • Verify the ConcurrencyLimiter implementation is correct
  • Ensure consistent behavior across the codebase

Would you like me to generate a test skeleton for the ConcurrencyLimiter class once it's implemented?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 93ec885 and 939ef26.

📒 Files selected for processing (1)
  • src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (9 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (2)
src/main/scala/dpla/api/v2/search/SearchProtocol.scala (15)
  • search (61-64)
  • search (66-70)
  • search (72-75)
  • search (77-80)
  • search (82-86)
  • search (88-91)
  • search (93-97)
  • search (99-104)
  • search (106-109)
  • search (111-115)
  • search (117-121)
  • search (123-127)
  • search (129-132)
  • search (134-138)
  • SearchProtocol (8-139)
src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala (2)
  • ElasticSearchResponseHandler (29-120)
  • ProcessElasticSearchResponse (33-36)
🪛 GitHub Actions: Scala CI
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala

[error] 90-92: sbt clean coverage test coverageReport failed. Compilation errors: not found: type ConcurrencyLimiter; not found: value maxConcurrent; not found: value timeoutSeconds in ElasticSearchClient.scala (lines 90-92).

🔇 Additional comments (7)
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (7)

25-26: LGTM: Imports support concurrency changes.

The ExecutionContext import is necessary for the implicit ec parameters added to each handler method, and the LoggerFactory import supports the error logging added for configuration validation.


32-32: LGTM: Logger properly initialized.

The SLF4J logger is correctly initialized and used appropriately for configuration validation errors.


40-61: LGTM: Configuration parsing is robust.

The environment variable parsing correctly handles invalid inputs with safe fallbacks. The validation ensures positive values and provides clear error messages when falling back to defaults.


190-192: Verify ConcurrencyLimiter API supports this usage pattern.

The code uses concurrencyLimiter { ... } syntax, which requires ConcurrencyLimiter to either:

  1. Have an apply method: def apply[T](f: => Future[T]): Future[T]
  2. Be assigned to a function value rather than a class instance

When implementing the missing ConcurrencyLimiter class, ensure its API supports this by-name parameter pattern. Based on past reviews, this should wrap the HTTP request with semaphore-based concurrency control.

The implementation should handle:

  • Acquiring permits with timeout (using semaphoreTimeoutSeconds)
  • Releasing permits on both success and failure
  • Avoiding permit leaks if Future construction fails
  • Non-blocking operation to avoid starving Akka threads

181-181: LGTM: ExecutionContext properly added to all handlers.

Each handler method correctly adds an implicit ExecutionContext derived from the actor system. This is necessary for Future operations and follows Akka best practices by using system.executionContext.

Also applies to: 235-235, 285-285, 338-338


166-169: LGTM: Documentation improvements.

The added Scaladoc comments correctly describe the per-session actor pattern, clarifying that each session has its own internal state and ActorRef.

Also applies to: 271-274


90-92: Ensure ConcurrencyLimiter addresses past review concerns.

Previous reviews raised several critical concerns about the concurrency limiting implementation (semaphore blocking, permit leaks, thread starvation). Those details are no longer visible in this file, suggesting they were refactored into the missing ConcurrencyLimiter class.

When providing the ConcurrencyLimiter implementation, ensure it addresses:

  1. Non-blocking operation: Avoid acquire() which blocks Akka threads; use tryAcquire() with timeout
  2. Permit leak prevention: Ensure permits are released even if Future construction fails, not just in andThen callback
  3. Thread safety: Properly handle concurrent access to the semaphore
  4. Timeout handling: Fail fast when semaphoreTimeoutSeconds is exceeded rather than blocking indefinitely
  5. Akka integration: Consider using Akka's native backpressure mechanisms or a dedicated blocking dispatcher

Past reviews suggested patterns like:

def apply[T](f: => Future[T])(implicit ec: ExecutionContext): Future[T] = {
  if (!semaphore.tryAcquire(timeoutSeconds, TimeUnit.SECONDS)) {
    Future.failed(new TimeoutException("ES concurrency limit exceeded"))
  } else {
    try {
      val future = f
      future.onComplete(_ => semaphore.release())(ec)
      future
    } catch {
      case e: Throwable =>
        semaphore.release()
        Future.failed(e)
    }
  }
}

Comment thread src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala Outdated
@moltude moltude merged commit 5d79d40 into main Dec 8, 2025
4 of 5 checks passed
@DominicBM DominicBM deleted the fix/api-exhaustion branch April 19, 2026 15:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants